home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- from xml.dom import expatbuilder
- from ZSI import _copyright, _children, _attrs, _child_elements, _stringtypes, _backtrace, EvaluateException, ParseException, _valid_encoding, _Node, _find_attr, _resolve_prefix
- from ZSI.TC import AnyElement
- import types
- from ZSI.wstools.Namespaces import SOAP, XMLNS
- from ZSI.wstools.Utility import SplitQName
-
- _find_actor = lambda E: if not E.getAttributeNS(SOAP.ENV, 'actor'):
- pass
-
- _find_mu = lambda E: E.getAttributeNS(SOAP.ENV, 'mustUnderstand')
-
- _find_root = lambda E: E.getAttributeNS(SOAP.ENC, 'root')
-
- _find_id = lambda E: _find_attr(E, 'id')
-
- class DefaultReader:
- fromString = staticmethod(expatbuilder.parseString)
- fromStream = staticmethod(expatbuilder.parse)
-
-
- class ParsedSoap:
- defaultReaderClass = DefaultReader
-
- def __init__(self, input, readerclass = None, keepdom = False, trailers = False, resolver = None, envelope = True, **kw):
- self.readerclass = readerclass
- self.keepdom = keepdom
- if not self.readerclass:
- self.readerclass = self.defaultReaderClass
-
-
- try:
- self.reader = self.readerclass()
- if type(input) in _stringtypes:
- self.dom = self.reader.fromString(input)
- else:
- self.dom = self.reader.fromStream(input)
- except Exception:
- e = None
- raise
-
- self.ns_cache = {
- id(self.dom): {
- 'xml': XMLNS.XML,
- 'xmlns': XMLNS.BASE,
- '': '' } }
- self.trailers = trailers
- self.resolver = resolver
- self.id_cache = { }
- c = _[1]
- if len(c) != 1:
- raise ParseException('Document has extra child elements', 0)
-
- if envelope is False:
- self.body_root = c[0]
- return None
-
- elt = c[0]
- if elt.localName != 'Envelope' or elt.namespaceURI != SOAP.ENV:
- raise ParseException('Document has "' + elt.localName + '" element, not Envelope', 0)
-
- self._check_for_legal_children('Envelope', elt)
- for a in _attrs(elt):
- name = a.nodeName
- if name.find(':') == -1 and name not in ('xmlns', 'id'):
- raise ParseException('Unqualified attribute "' + name + '" in Envelope', 0)
- continue
-
- self.envelope = elt
- if not _valid_encoding(self.envelope):
- raise ParseException('Envelope has invalid encoding', 0)
-
- c = _[2]
- elt = c[0]
- if elt.localName == 'Header' and elt.namespaceURI == SOAP.ENV:
- self._check_for_legal_children('Header', elt)
- self._check_for_pi_nodes(_children(elt), 1)
- self.header = c.pop(0)
- self.header_elements = _child_elements(self.header)
- else:
- self.header = None
- self.header_elements = []
- if len(c) == 0:
- raise ParseException('Envelope has header but no Body', 0)
-
- elt = c.pop(0)
- if elt.localName != 'Body' or elt.namespaceURI != SOAP.ENV:
- if self.header:
- raise ParseException('Header followed by "' + elt.localName + '" element, not Body', 0, elt, self.dom)
- else:
- raise ParseException('Document has "' + elt.localName + '" element, not Body', 0, elt, self.dom)
-
- self._check_for_legal_children('Body', elt, 0)
- self._check_for_pi_nodes(_children(elt), 0)
- self.body = elt
- if not _valid_encoding(self.body):
- raise ParseException('Body has invalid encoding', 0)
-
- if not self.trailers:
- if len(c):
- raise ParseException('Element found after Body', 0, elt, self.dom)
-
- else:
- self.trailer_elements = c
- for elt in self.trailer_elements:
- if not elt.namespaceURI:
- raise ParseException('Unqualified trailer element', 0, elt, self.dom)
- continue
-
- self.body_root = None
- no = []
- maybe = []
- for elt in _child_elements(self.body):
- root = _find_root(elt)
- if root == '1':
- if self.body_root:
- raise ParseException('Multiple seralization roots found', 0, elt, self.dom)
-
- self.body_root = elt
- continue
- if root == '0':
- no.append(elt)
- continue
- if not root:
- maybe.append(elt)
- continue
- raise ParseException('Illegal value for root attribute', 0, elt, self.dom)
-
- if self.body_root is None:
- if len(maybe):
- self.body_root = maybe[0]
- else:
- raise ParseException('No serialization root found', 0, self.body, self.dom)
-
- if not _valid_encoding(self.body_root):
- raise ParseException('Invalid encoding', 0, elt, self.dom)
-
- rootid = id(self.body_root)
- self.data_elements = _[3]
- self._check_for_pi_nodes(self.data_elements, 0)
-
-
- def __del__(self):
-
- try:
- if not self.keepdom:
- self.reader.releaseNode(self.dom)
- except:
- pass
-
-
-
- def _check_for_legal_children(self, name, elt, mustqualify = 1):
- inheader = name == 'Header'
- for n in _children(elt):
- t = n.nodeType
- if t == _Node.COMMENT_NODE:
- continue
-
- if t != _Node.ELEMENT_NODE:
- if t == _Node.TEXT_NODE and n.nodeValue.strip() == '':
- continue
-
- raise ParseException('Non-element child in ' + name, inheader, elt, self.dom)
-
- if mustqualify and not (n.namespaceURI):
- raise ParseException('Unqualified element "' + n.nodeName + '" in ' + name, inheader, elt, self.dom)
- continue
-
-
-
- def _check_for_pi_nodes(self, list, inheader):
- list = list[:]
- while list:
- elt = list.pop()
- t = elt.nodeType
- if t == _Node.PROCESSING_INSTRUCTION_NODE:
- raise ParseException('Found processing instruction "<?' + elt.nodeName + '...>"', inheader, elt.parentNode, self.dom)
- elif t == _Node.DOCUMENT_TYPE_NODE:
- raise ParseException('Found DTD', inheader, elt.parentNode, self.dom)
-
- list += _children(elt)
-
-
- def Backtrace(self, elt):
- return _backtrace(elt, self.dom)
-
-
- def FindLocalHREF(self, href, elt, headers = 1):
- if href[0] != '#':
- raise EvaluateException('Absolute HREF ("%s") not implemented' % href, self.Backtrace(elt))
-
- frag = href[1:]
- e = self.id_cache.get(frag)
- if e:
- return e
-
- list = self.data_elements[:] + [
- self.body_root]
- if headers:
- list.extend(self.header_elements)
-
- while list:
- e = list.pop()
- if e.nodeType == _Node.ELEMENT_NODE:
- nodeid = _find_id(e)
- if nodeid:
- self.id_cache[nodeid] = e
- if nodeid == frag:
- return e
-
-
-
- list += _children(e)
- raise EvaluateException('Can\'t find node for HREF "%s"' % href, self.Backtrace(elt))
-
-
- def ResolveHREF(self, uri, tc, **keywords):
- r = getattr(tc, 'resolver', self.resolver)
- if not r:
- raise EvaluateException('No resolver for "' + uri + '"')
-
-
- try:
- if type(uri) == types.UnicodeType:
- uri = str(uri)
-
- retval = r(uri, tc, self, **keywords)
- except Exception:
- e = None
- raise EvaluateException('Can\'t resolve "' + uri + '" (' + str(e.__class__) + '): ' + str(e))
-
- return retval
-
-
- def GetMyHeaderElements(self, actorlist = None):
- if actorlist is None:
- actorlist = [
- None,
- SOAP.ACTOR_NEXT]
- else:
- actorlist = list(actorlist) + [
- None,
- SOAP.ACTOR_NEXT]
- return _[1]
-
-
- def GetElementNSdict(self, elt):
- d = self.ns_cache.get(id(elt))
- if not d:
- if elt != self.dom:
- d = self.GetElementNSdict(elt.parentNode)
-
- for a in _attrs(elt):
- if a.namespaceURI == XMLNS.BASE:
- if a.localName == 'xmlns':
- d[''] = a.nodeValue
- else:
- d[a.localName] = a.nodeValue
- a.localName == 'xmlns'
-
- self.ns_cache[id(elt)] = d
-
- return d.copy()
-
-
- def GetDomAndReader(self):
- return (self.dom, self.reader)
-
-
- def IsAFault(self):
- e = self.body_root
- if not e:
- return 0
-
- if e.namespaceURI == SOAP.ENV:
- pass
- return e.localName == 'Fault'
-
-
- def Parse(self, how):
- if type(how) == types.ClassType:
- how = how.typecode
-
- return how.parse(self.body_root, self)
-
-
- def WhatMustIUnderstand(self):
- return _[1]
-
-
- def WhatActorsArePresent(self):
- results = []
- for E in self.header_elements:
- a = _find_actor(E)
- if a not in [
- None,
- SOAP.ACTOR_NEXT]:
- results.append(a)
- continue
-
- return results
-
-
- def ParseHeaderElements(self, ofwhat):
- d = { }
- lenofwhat = len(ofwhat)
- c = self.header_elements[:]
- crange = range(len(self.header_elements))
- for i in range(lenofwhat):
- pass
-
- return d
-
-
- if __name__ == '__main__':
- print _copyright
-
-